{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"images/dask_horizontal.svg\" align=\"right\" width=\"30%\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy or Pandas syntax on data collections. None of this would be very useful, if there weren't also a way to execute these graphs, in a parallel and memory-aware way. So far we have been calling `thing.compute()` or `dask.compute(thing)` without worrying what this entails. Now we will discuss the options available for that execution, and in particular, the distributed scheduler, which comes with additional functionality.\n",
    "\n",
    "Dask comes with four available schedulers:\n",
    "- \"threaded\": a scheduler backed by a thread pool\n",
    "- \"processes\": a scheduler backed by a process pool\n",
    "- \"single-threaded\" (aka \"sync\"): a synchronous scheduler, good for debugging\n",
    "- distributed: a distributed scheduler for executing graphs on multiple machines, see below.\n",
    "\n",
    "To select one of these for computation, you can specify at the time of asking for a result, e.g.,\n",
    "```python\n",
    "myvalue.compute(scheduler=\"single-threaded\")  # for debugging\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "or set the current default, either temporarily or globally\n",
    "```python\n",
    "with dask.config.set(scheduler='processes'):\n",
    "    # set temporarily fo this block only\n",
    "    myvalue.compute()\n",
    "\n",
    "dask.config.set(scheduler='processes')\n",
    "# set until further notice\n",
    "```\n",
    "\n",
    "Lets see the difference for the familiar case of the flights data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.dataframe as dd\n",
    "import os\n",
    "df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),\n",
    "                 parse_dates={'Date': [0, 1, 2]},\n",
    "                 dtype={'TailNum': object,\n",
    "                        'CRSElapsedTime': float,\n",
    "                        'Cancelled': bool})\n",
    "\n",
    "# Maximum non-cancelled delay\n",
    "largest_delay = df[~df.Cancelled].DepDelay.max()\n",
    "largest_delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# each of the following gives the same results (you can check!)\n",
    "# any surprises?\n",
    "import time\n",
    "for sch in ['threading', 'processes', 'sync']:\n",
    "    t0 = time.time()\n",
    "    _ = largest_delay.compute(scheduler=sch)\n",
    "    print(sch, time.time() - t0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Some Questions to Consider:\n",
    "\n",
    "- How much speedup is possible for this task (hint, look at the graph).\n",
    "- Given how many cores are on this machine, how much faster could the parallel schedulers be than the single-threaded scheduler.\n",
    "- How much faster was using threads over a single thread? Why does this differ from the optimal speedup?\n",
    "- Why is the multiprocessing scheduler so much slower here?\n",
    "\n",
    "\n",
    "For single-machine use, the threaded and multiprocessing schedulers are fine choices. They are solid, mature and performant, and require absolutely no set-up. As a rule of thumb, threaded will work well when the functions called release the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock), whereas multiprocessing will always have a slower start-up time and suffer where a lot of communication is required between tasks. The *number* of workers is, in general, also important.\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For scaling out work across a cluster, the distributed scheduler is required. Indeed, this is now generally preferred for all work, because it gives you additional monitoring information not available in the other schedulers. (Some of this monitoring is also available with an explicit progress bar and profiler, see [here](http://dask.pydata.org/en/latest/diagnostics.html).)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Making a cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simple method"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `dask.distributed` system is composed of a single centralized scheduler and one or more worker processes. [Deploying](http://dask.pydata.org/en/latest/setup.html) a remote Dask cluster involves some additional effort. But doing things locally is just involves creating a `Client` object, which lets you interact with the \"cluster\" (local threads or processes on your machine). For more information see [here](http://dask.pydata.org/en/latest/setup/single-distributed.html). \n",
    "\n",
    "Note that `Client()` takes a lot of optional [arguments](http://distributed.readthedocs.io/en/latest/local-cluster.html#api), to configure the number of processes/threads, memory limits and other "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "\n",
    "# Setup a local cluster.\n",
    "# By default this sets up 1 worker per core\n",
    "client = Client()\n",
    "client.cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Be sure to click the `Dashboard` link to open up the diagnostics dashboard.\n",
    "\n",
    "\n",
    "\n",
    "## Executing with the distributed client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Consider some trivial calculation, such as we've used before, where we have added sleep statements in order to simulate real work being done."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask import delayed\n",
    "import time\n",
    "\n",
    "def inc(x):\n",
    "    time.sleep(5)\n",
    "    return x + 1\n",
    "\n",
    "def dec(x):\n",
    "    time.sleep(3)\n",
    "    return x - 1\n",
    "\n",
    "def add(x, y):\n",
    "    time.sleep(7)\n",
    "    return x + y"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By default, creating a `Client` makes it the default scheduler. Any calls to `.compute` will use the cluster your `client` is attached to, unless you specify otherwise, as above.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x = delayed(inc)(1)\n",
    "y = delayed(dec)(2)\n",
    "total = delayed(add)(x, y)\n",
    "total.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The tasks will appear in the web UI as they are processed by the cluster and, eventually, a result will be printed as output of the cell above. Note that the kernel is blocked while waiting for the result. The resulting tasks block graph might look something like below. Hovering over each block gives which function it related to, and how long it took to execute. ![this](images/tasks.png)\n",
    "\n",
    "You can also see a simplified version of the graph being executed on Graph pane of the dashboard, so long as the calculation is in-flight."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's return to the flights computation from before, and see what happens on the dashboard (you may wish to have both the notebook and dashboard side-by-side). How did does this perform compared to before?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time largest_delay.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this particular case, this should be as fast or faster than the best case, threading, above. Why do you suppose this is? You should start your reading [here](https://distributed.readthedocs.io/en/latest/#architecture), and in particular note that the distributed scheduler was a complete rewrite with more intelligence around sharing of intermediate results and which tasks run on which worker. This will result in better performance in *some* cases, but still larger latency and overhead compared to the threaded scheduler, so there will be rare cases where it performs worse. Fortunately, the dashboard now gives us a lot more [diagnostic information](https://distributed.readthedocs.io/en/latest/diagnosing-performance.html#diagnosing-performance). Look at the Profile page of the dashboard to fund out what takes the biggest fraction of CPU time for the computation we just performed?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If all you want to do is execute computations created using delayed, or run calculations based on the higher-level data collections (see the coming sections), then that is about all you need to know to scale your work up to cluster scale. However, there is more detail to know about the distributed scheduler that will help with efficient usage. See the chapter Distributed, Advanced."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise\n",
    "\n",
    "Run the following computations while looking at the diagnostics page. In each case what is taking the most time?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Number of flights\n",
    "_ = len(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Number of non-cancelled flights\n",
    "_ = len(df[~df.Cancelled])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Number of non-cancelled flights per-airport\n",
    "_ = df[~df.Cancelled].groupby('Origin').Origin.count().compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Average departure delay from each airport?\n",
    "_ = df[~df.Cancelled].groupby('Origin').DepDelay.mean().compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Average departure delay per day-of-week\n",
    "_ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
